14 实践课-MySQL 数据库连接与基础查询工具开发
MySQL 数据库连接与基础查询工具开发
关联:索引
AI 工具使用:数据库连接 / SQL 优化 / 报错排查提示词模板(学生可直接复制)
使用方法:把你的表结构、SQL、报错信息、环境(Windows/Conda/依赖版本)粘贴到
{你的内容};要求 AI 输出“步骤清单 + 可复制代码 + 校验点”,便于照做与复盘。
模板目录:
- 模板 1:生成最小可运行的 MySQL 连接与查询代码(带连接池)
- 模板 2:根据分拣场景表结构生成 3 条查询语句(含索引/优化建议)
- 模板 3:SQL 注入风险评审(你写的 SQL/代码哪里不安全)
- 模板 4:连接报错排查(定位→修复→预防)
模板 1:生成最小可运行连接代码(带连接池 + 参数化查询)
你是 Python 实践课助教。请给我一份“最小可运行”的 MySQL 连接与查询示例,要求:
1)使用 mysql-connector-python;使用连接池(pool_name/pool_size);
2)配置从环境变量读取:MYSQL_HOST/MYSQL_PORT/MYSQL_USER/MYSQL_PASSWORD/MYSQL_DATABASE;
3)提供一个函数 select(sql, params) 只允许 SELECT,必须使用参数化查询,占位符用 %s;
4)返回结果为 list[dict](带字段名);
5)加入最基本安全校验:禁止多语句与危险关键字,给出可执行的报错信息与修复建议;
6)给出一段自测:跑一个条件查询和一个聚合查询。
我的环境与表结构:{你的内容}
模板 2:生成分拣场景 SQL(条件 + 聚合 + 优化)
下面是我分拣场景的 MySQL 表结构(含字段与索引)。请你:
1)生成 3 条 SQL:①条件查询(带排序与分页);②聚合查询(按产线/日期/品级汇总);③联表查询(分拣记录 + 批次 + 品质标准,可选再关联设备状态);
2)所有查询都必须可参数化(给出 SQL + params 示例);
3)指出每条 SQL 的潜在慢点,并给出索引建议或重写建议;
4)不要使用 SELECT *,只选必要字段。
表结构:{你的内容}
模板 3:SQL 注入风险评审(代码/SQL 安全审计)
请对我下面的数据库查询代码与 SQL 做一次安全审计,输出:
1)注入风险点(逐条指出:字符串拼接/动态表名/多语句/注释绕过等);
2)修复方案(用参数化、白名单、最小权限、只读账号等);
3)给出修复后的代码片段(可直接替换);
4)提供 3 个“恶意输入”用例用于验证是否已防注入。
我的代码与 SQL:{你的内容}
模板 4:连接报错排查(定位→修复→预防)
下面是我连接 MySQL 的报错信息、代码片段与环境信息。请按“定位→修复→预防”输出:
1)先判断属于哪类:账号权限/密码、网络与端口、防火墙、数据库名不存在、字符集、连接池耗尽、超时;
2)给出最短排查步骤(最多 8 步),每一步要写清我该检查什么、怎么验证;
3)给出修复后的代码改动建议(要点即可);
4)给出预防清单(3 条以内)。
我的内容:{你的内容}
- 场景提问:分拣线上出现“某批次苹果瑕疵率升高”,你需要快速回答两件事:①到底升高了吗?②是哪个设备/班次更明显?
1. 为什么先讲权限?
-
工业数据场景里,数据库不仅是“能连上”,更要“连上后不越权、不泄露、不被注入”。
-
建议准备两个账号:
-
greensort_ro:只读账号(仅 SELECT 指定库的指定表)。
如果你使用的是学校统一 MySQL 环境,可能不允许你创建用户/建库:那就跳过“创建用户/建库”,直接使用老师发的只读账号与数据库名。
1. 登录 MySQL(命令行方式)
mysql -h 127.0.0.1 -P 3306 -u root -p
关键点解释:
-h:MySQL 主机地址;本机通常用127.0.0.1或localhost。-P:端口;MySQL 默认3306。-p:提示输入密码(不会在命令行明文显示)。
2. 创建数据库与只读账号(演示版)
项目路径:08_greensort_system/scripts/db/00_create_db_and_user.sql
CREATE DATABASE IF NOT EXISTS greensort_db DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
# 可选操作
CREATE USER IF NOT EXISTS 'greensort_ro'@'%' IDENTIFIED BY 'Replace_With_Strong_Password';
GRANT SELECT ON greensort_db.* TO 'greensort_ro'@'%';
FLUSH PRIVILEGES;
关键点解释:
utf8mb4:推荐字符集,支持更完整的 Unicode;避免中文/符号乱码。utf8mb4_unicode_ci:兼容性更好(MySQL 5.7 / MySQL 8.0 / 大多数 MariaDB 都支持)。如果你使用 MySQL 8.0 也可以换成utf8mb4_0900_ai_ci,但旧版本会报Unknown collation。greensort_ro@'%':%表示允许从任意主机连接;若在内网生产环境,应改成固定网段或固定 IP(更安全)。GRANT SELECT:只给 SELECT,符合最小权限;不授予写入与结构变更权限。- 强密码:不要用
123456;工业场景必须满足口令强度与轮换要求。
排错提示(遇到 collation 报错时用):
SELECT VERSION();
SHOW COLLATION LIKE 'utf8mb4%';
关键点解释:
SELECT VERSION():确认是 MySQL 5.7 / 8.0 还是 MariaDB,不同版本支持的排序规则不同。
安全提示(必须强调):
3. 建表:分拣场景业务数据结构(对齐项目数据库设计文档)
项目路径:08_greensort_system/scripts/db/01_schema.sql
USE greensort_db;
CREATE TABLE IF NOT EXISTS sorting_batch (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
batch_no VARCHAR(32) NOT NULL UNIQUE,
fruit_type VARCHAR(32) NOT NULL DEFAULT 'xibeilei_apple',
line_id VARCHAR(16) NOT NULL DEFAULT 'LINE-01',
operator_id BIGINT UNSIGNED DEFAULT NULL,
start_time DATETIME NOT NULL,
end_time DATETIME NULL,
total_count INT UNSIGNED NOT NULL DEFAULT 0,
grade_a_count INT UNSIGNED NOT NULL DEFAULT 0,
grade_b_count INT UNSIGNED NOT NULL DEFAULT 0,
grade_c_count INT UNSIGNED NOT NULL DEFAULT 0,
grade_d_count INT UNSIGNED NOT NULL DEFAULT 0,
grade_e_count INT UNSIGNED NOT NULL DEFAULT 0,
status TINYINT NOT NULL DEFAULT 1,
remark VARCHAR(255) NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_batch_no (batch_no),
INDEX idx_line_date (line_id, start_time),
INDEX idx_fruit_type (fruit_type),
INDEX idx_status (status)
);
CREATE TABLE IF NOT EXISTS sorting_record (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
batch_id BIGINT UNSIGNED NOT NULL,
fruit_id VARCHAR(64) NOT NULL UNIQUE,
line_id VARCHAR(32) NOT NULL,
diameter_mm FLOAT NOT NULL,
color_coverage_pct FLOAT NOT NULL,
defect_count INT NOT NULL DEFAULT 0,
defect_types JSON NULL,
shape_index FLOAT NOT NULL,
surface_smoothness FLOAT NOT NULL,
overall_score FLOAT NOT NULL,
grade ENUM('A','B','C','D','E') NOT NULL,
target_zone INT NOT NULL,
cycle_time_ms INT NOT NULL,
image_path VARCHAR(512) NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_batch_created (batch_id, created_at),
INDEX idx_line_grade_date (line_id, grade, created_at),
INDEX idx_grade_time (grade, created_at),
CONSTRAINT fk_sorting_batch FOREIGN KEY (batch_id) REFERENCES sorting_batch(id) ON DELETE RESTRICT
);
CREATE TABLE IF NOT EXISTS quality_standard (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
fruit_type VARCHAR(32) NOT NULL,
grade ENUM('A','B','C','D','E') NOT NULL,
min_diameter_mm FLOAT NOT NULL,
max_diameter_mm FLOAT NULL,
min_color_coverage_pct FLOAT NOT NULL,
max_defect_count INT NOT NULL,
min_shape_index FLOAT NOT NULL,
target_zone INT NOT NULL,
weight_config JSON NOT NULL,
is_active TINYINT(1) NOT NULL DEFAULT 1,
version INT NOT NULL DEFAULT 1,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_fruit_grade_version (fruit_type, grade, version),
INDEX idx_active (is_active)
);
CREATE TABLE IF NOT EXISTS equipment_status (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
device_id VARCHAR(64) NOT NULL,
device_type ENUM('arm','agv','camera','conveyor') NOT NULL,
line_id VARCHAR(32) NULL,
status ENUM('online','offline','error','maintenance') NOT NULL,
operation_mode ENUM('auto','manual','idle') NULL,
battery_percent INT NULL,
position JSON NULL,
error_code VARCHAR(32) NULL,
last_heartbeat DATETIME NULL,
is_latest TINYINT(1) NOT NULL DEFAULT 1,
reported_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_device_latest (device_id, is_latest),
INDEX idx_device_time (device_id, reported_at),
INDEX idx_status_latest (status, is_latest)
);
关键点解释(对应项目设计文档的统一口径):
- 核心业务表:
sorting_batch:批次(batch_no、line_id、status),包含operator_id与grade_*_count/total_count等统计冗余字段,用于进度跟踪与快速汇总。sorting_record:分拣记录(每个fruit_id一条),是查询量最大的核心事实表。quality_standard:品质标准(按fruit_type + grade + version管理),用于解释“为什么分到该品级/区域”。equipment_status:设备状态(ROS2 节点上报),用于判断“设备是否异常/是否在线”。- 字段口径对齐:
- 产线:
line_id形如LINE-01 - 果实:
fruit_id形如APPLE-YYYYMMDD-000001 - 品级:
grade取值A/B/C/D/E - 索引设计:优先服务“按产线+时间范围”“按批次+时间”“按果实ID”这三类高频查询;避免过度索引。
4. 插入少量演示数据(用于自测)
项目路径:08_greensort_system/scripts/db/02_seed.sql
USE greensort_db;
INSERT INTO sorting_batch(batch_no, fruit_type, line_id, start_time, status)
VALUES ('BATCH-20260409-001', 'xibeilei_apple', 'LINE-01', NOW(), 1);
INSERT INTO quality_standard(
fruit_type, grade,
min_diameter_mm, max_diameter_mm, min_color_coverage_pct, max_defect_count, min_shape_index,
target_zone, weight_config, is_active, version
)
VALUES
('xibeilei_apple', 'A', 145, NULL, 90, 0, 90, 1, '{"着色":40, "果径":25, "缺陷":25, "果形":10}', 1, 1),
('xibeilei_apple', 'B', 135, 145, 80, 1, 85, 2, '{"着色":40, "果径":25, "缺陷":25, "果形":10}', 1, 1);
INSERT INTO sorting_record(
batch_id, fruit_id, line_id,
diameter_mm, color_coverage_pct, defect_count, defect_types, shape_index, surface_smoothness, overall_score,
grade, target_zone, cycle_time_ms, image_path
)
VALUES
(1, 'APPLE-20260409-000001', 'LINE-01', 145.2, 92.0, 0, JSON_ARRAY(), 92.0, 88.0, 90.2, 'A', 1, 8200, '/uploads/2026/04/09/apple_000001.jpg'),
(1, 'APPLE-20260409-000002', 'LINE-01', 138.5, 85.0, 1, JSON_ARRAY('bruise'), 86.0, 80.0, 82.5, 'B', 2, 9100, '/uploads/2026/04/09/apple_000002.jpg');
INSERT INTO equipment_status(device_id, device_type, line_id, status, operation_mode, battery_percent, is_latest, last_heartbeat)
VALUES
('camera_001', 'camera', 'LINE-01', 'online', 'auto', NULL, 1, NOW()),
('agv_001', 'agv', 'LINE-01', 'online', 'auto', 78, 1, NOW());
关键点解释:
quality_standard:用于“记录 → 批次 → 标准”的联表追溯示例;如果不插入这张表的数据,联表查询可能查不到对应的标准行。NOW():用当前时间生成时间字段,便于按时间窗口做筛选与聚合。- 演示数据量要小:目标是“能跑通查询并验证返回结构”,不是压测数据库。
1. 条件查询:按批次与等级筛选(带排序与分页)
SELECT
id, batch_id, fruit_id, line_id, diameter_mm, overall_score, grade, target_zone, created_at
FROM sorting_record
WHERE batch_id = 1
AND grade IN ('A', 'B')
ORDER BY created_at DESC
LIMIT 20;
关键点解释:
-
不用
SELECT *:只取需要的字段,减少 IO,结果更清晰。 -
WHERE batch_id = ...:典型业务筛选条件;配合idx_batch_created更快。 -
ORDER BY ... LIMIT ...:分页与“看最近数据”的典型模式。 -
上面示例把值写死在 SQL 里是为了讲清结构;真正写代码时必须参数化,不要拼接字符串。
2. 聚合查询:按产线统计今日品级分布与平均分拣周期
SELECT
grade,
COUNT(*) AS cnt,
AVG(cycle_time_ms) AS avg_cycle_time_ms
FROM sorting_record
WHERE line_id = 'LINE-01'
AND created_at >= CURDATE()
AND created_at < DATE_ADD(CURDATE(), INTERVAL 1 DAY)
GROUP BY grade
ORDER BY grade;
关键点解释:
COUNT(*):数量统计;AVG(cycle_time_ms):平均分拣周期,常用于吞吐分析。- 真实项目建议避免对索引列做函数包裹(如
DATE(created_at)),否则容易导致索引失效;优先用“时间范围”写法(如上面>= CURDATE()且< CURDATE()+1day)。 - 若数据量大:聚合会更吃资源;要尽量减少扫描范围(先用 WHERE 缩小集合)。
3. 联表查询(进阶预告):把“分拣记录 → 批次 → 品质标准”串成可追溯证据
SELECT
sr.created_at,
sb.batch_no,
sr.fruit_id,
sr.line_id,
sr.grade,
sr.overall_score,
qs.min_diameter_mm,
qs.min_color_coverage_pct,
qs.max_defect_count,
qs.target_zone AS standard_target_zone
FROM sorting_record sr
JOIN sorting_batch sb ON sb.id = sr.batch_id
JOIN quality_standard qs
ON qs.fruit_type = sb.fruit_type
AND qs.grade = sr.grade
AND qs.is_active = 1
WHERE sr.batch_id = 1
ORDER BY sr.created_at DESC
LIMIT 20;
关键点解释:
-
JOIN:把“记录→批次→标准”串起来,回答“为什么是这个品级/这个目标区域”,便于解释与审计。 -
只取必要字段:联表查询更要克制字段数量。
-
只读账号先行:默认只给 SELECT,避免越权与误操作。
-
查询先跑通再优化:先做正确性验证(字段/口径/过滤条件),再谈索引与性能。
-
常见错误:
-
Access denied:账号/密码错误或权限不足(用只读账号却试图写入也会触发)。 -
Unknown database:数据库名写错或未创建。 -
You have an error in your SQL syntax:拼写/逗号/引号问题,先用最小语句定位。
- 目标提醒:今天不是写“任意 SQL 执行器”,而是写“安全可控的查询工具”。
二、项目视角:把代码串起来(文件级连线)
-
08_greensort_system/scripts/db/00_create_db_and_user.sql:创建
greensort_db与只读账号greensort_ro(最小权限) -
08_greensort_system/scripts/db/01_schema.sql:建表(
sorting_record/sorting_batch/quality_standard/equipment_status) -
08_greensort_system/scripts/db/02_seed.sql:灌入少量演示数据(用于本节课自测)
-
08_greensort_system/backend/common/mysql_client.py:(可选对照)直连数据库原型
MySQLClient.select()(只读 + 参数化 + 基础安全校验) -
08_greensort_system/backend/tools/query_sorting_records.py:Tool 封装(调用 FastAPI 接口
/api/v1/sorting/records,对齐 API 的筛选字段与分页上限) -
08_greensort_system/backend/app/core/database.py:项目一致实现(SQLAlchemy + aiomysql,主库/只读库分离)
-
08_greensort_system/backend/app/core/database_deps.py:FastAPI 依赖注入(
get_db()),把连接生命周期收敛到一次请求 -
08_greensort_system/backend/schemas/sorting.py:Pydantic 数据结构(分页响应、记录字段口径)
-
08_greensort_system/backend/repositories/sorting_repo.py:数据访问层(参数化查询、时间范围过滤、分页)
-
08_greensort_system/backend/services/sorting_service.py:业务层(口径与权限入口)
-
08_greensort_system/backend/app/api/v1/sorting.py:路由层(对齐 API 文档的查询参数)
-
访问路径:前端不直连数据库,统一通过 FastAPI(Bearer Token + 权限校验)访问;数据库账号权限再做“最小权限兜底”。
-
分层习惯:路由(API)→ Service(业务)→ Repository(数据访问)→ DB(Session/连接池)。本推荐 Tool 调用 FastAPI 接口来复用这条链路,避免 Tool 直连数据库。
-
查询写法:尽量避免
DATE(created_at)这类函数包裹索引列;用时间范围(created_at >= ... AND created_at < ...)更贴近索引/分区的真实收益。
工具函数访问数据库:两种方式怎么选(学生必读)
- 方式 A(企业更常见,推荐):Tool → HTTP(httpx)→ FastAPI → Repository/DB
- 方式 B(存在但更受控):Tool → 直连 DB(MySQLClient/SQLAlchemy)→ DB
对比要点:
- 安全与治理:
- A:认证鉴权、审计日志、限流、脱敏集中在 API 层,Tool 不需要持有数据库账号,更安全。
- B:Tool 需要数据库账号或数据库网络访问权限,凭据分发与轮换成本高,审计与权限更难统一。
- 架构边界:
- A:数据库只对后端服务开放(内网/安全组),Tool 只访问 API,边界清晰。
- B:数据库暴露面扩大,更多运行环境需要直达数据库。
- 适用场景:
- A:智能体工具、前端/脚本、跨系统集成等“在后端服务进程之外运行”的 Tool。
- B:同进程内工具函数、受控运维脚本、数据平台批处理(通常直连只读库/数仓,且强审计)。
本的选择:
- 第 4 步给出项目正式接口路线(FastAPI + SQLAlchemy 分层)。
- 第 5 步的 Tool 采用方式 A:用 httpx 调用 FastAPI 接口(更贴近企业级落地)。
MySQLClient作为“可选对照原型”:用于理解参数化查询、只读约束、连接池与基础注入防护,但不作为推荐生产落地方式。
包导入运行示例(建议直接复制):
cd 08_greensort_system/backend
# 启动 FastAPI(项目正式接口路线)
python -m uvicorn app.main:app --host 127.0.0.1 --port 8000
1. 安装依赖(示例)
项目路径:08_greensort_system/backend/requirements.txt(依赖清单)
pip install mysql-connector-python==8.4.0 sqlalchemy==2.0.30 aiomysql==0.2.0 httpx==0.27.0 python-dotenv==1.2.2
关键点解释:
mysql-connector-python:MySQL 官方驱动之一,支持连接池与参数化查询。python-dotenv:从.env加载环境变量,避免把账号密码写在代码里。httpx:用于 Tool 调用 FastAPI 接口(推荐路线)。sqlalchemy+aiomysql:用于本“项目版实现(FastAPI + SQLAlchemy AsyncSession)”的小节;如果你只做MySQLClient(mysql-connector-python)版本,可暂不安装这两项。
2. 配置 .env(不要提交仓库)
项目路径:08_greensort_system/backend/.env.example(复制为 .env 使用)
MYSQL_HOST=127.0.0.1
MYSQL_PORT=3306
MYSQL_USER=greensort_ro
MYSQL_PASSWORD=Replace_With_Strong_Password
MYSQL_DATABASE=greensort_db
MYSQL_READONLY_USER=greensort_ro
MYSQL_READONLY_PASSWORD=Replace_With_Strong_Password
MYSQL_POOL_NAME=sorting_pool
MYSQL_POOL_SIZE=5
MYSQL_CONN_TIMEOUT=5
关键点解释:
-
MYSQL_CONN_TIMEOUT:连接超时(秒);用于避免“卡死式等待”。 -
项目常见配置形态是“主库读写 + 从库只读”,并在应用侧配置连接池参数(如
pool_size / max_overflow / pool_recycle / pool_pre_ping)。
3.(可选对照)直连数据库原型:MySQLClient(mysql-connector-python)
项目路径:08_greensort_system/backend/common/mysql_client.py
定位说明(先讲清边界):
- 真实项目推荐把数据访问收敛在 FastAPI 后端,并让 Tool 调用后端接口(见第 5 步),避免 Tool 直接持有数据库账号与权限。
import os
import re
import uuid
from dataclasses import dataclass
from typing import Any
from dotenv import load_dotenv
from mysql.connector.pooling import MySQLConnectionPool
_DISALLOWED_SQL = re.compile(
r"(?is)\b(insert|update|delete|drop|alter|truncate|create|grant|revoke)\b|;|--|/\*|\*/"
)
@dataclass(frozen=True)
class QueryResult:
trace_id: str
rows: list[dict[str, Any]]
class MySQLClient:
def __init__(self) -> None:
load_dotenv()
self._pool = self._build_pool()
def _build_pool(self) -> MySQLConnectionPool:
host = os.getenv("MYSQL_HOST", "127.0.0.1")
port = int(os.getenv("MYSQL_PORT", "3306"))
user = os.getenv("MYSQL_USER", "")
password = os.getenv("MYSQL_PASSWORD", "")
database = os.getenv("MYSQL_DATABASE", "")
pool_name = os.getenv("MYSQL_POOL_NAME", "sorting_pool")
pool_size = int(os.getenv("MYSQL_POOL_SIZE", "5"))
conn_timeout = int(os.getenv("MYSQL_CONN_TIMEOUT", "5"))
if not user or not password or not database:
raise RuntimeError("MySQL 配置缺失:请检查 .env 中 MYSQL_USER/MYSQL_PASSWORD/MYSQL_DATABASE")
return MySQLConnectionPool(
pool_name=pool_name,
pool_size=pool_size,
host=host,
port=port,
user=user,
password=password,
database=database,
connection_timeout=conn_timeout,
)
def select(self, sql: str, params: tuple[Any, ...] | None = None) -> QueryResult:
trace_id = uuid.uuid4().hex[:8]
normalized = (sql or "").strip()
if not normalized:
raise ValueError(f"EMPTY_SQL trace_id={trace_id}")
if not normalized.lower().startswith("select"):
raise ValueError(f"ONLY_SELECT_ALLOWED trace_id={trace_id}")
if _DISALLOWED_SQL.search(normalized):
raise ValueError(f"SQL_NOT_ALLOWED trace_id={trace_id}")
cnx = self._pool.get_connection()
try:
cur = cnx.cursor(dictionary=True)
cur.execute(normalized, params or ())
rows = list(cur.fetchall())
cur.close()
return QueryResult(trace_id=trace_id, rows=rows)
finally:
cnx.close()
def _self_test() -> None:
db = MySQLClient()
r1 = db.select(
"""
SELECT id, batch_id, fruit_id, line_id, diameter_mm, overall_score, grade, target_zone, created_at
FROM sorting_record
WHERE batch_id = %s AND grade IN (%s, %s)
ORDER BY created_at DESC
LIMIT %s
""",
(1, "A", "B", 20),
)
print("trace_id:", r1.trace_id, "rows:", len(r1.rows))
r2 = db.select(
"""
SELECT grade, COUNT(*) AS cnt, AVG(cycle_time_ms) AS avg_cycle_time_ms
FROM sorting_record
WHERE line_id = %s AND created_at >= %s AND created_at < %s
GROUP BY grade
ORDER BY grade
""",
("LINE-01", "2026-04-09 00:00:00", "2026-04-10 00:00:00"),
)
print("trace_id:", r2.trace_id, "rows:", r2.rows)
if __name__ == "__main__":
_self_test()
关键点解释(自检用,建议逐条对照):
-
load_dotenv():从.env加载环境变量,避免密码硬编码。 -
MySQLConnectionPool(...):创建连接池;pool_size控制并发连接上限。 -
select(sql, params):只提供只读查询入口;把“安全约束”集中在一个地方更可控。 -
startswith("select"):最基础的“只读拦截”;并不完美,但能阻断大部分误操作。 -
_DISALLOWED_SQL:额外拦截多语句;、注释-- /* */、以及常见危险动词;用于降低注入与误执行风险。 -
cur.execute(normalized, params):参数化查询,避免把用户输入拼进 SQL 字符串。 -
cursor(dictionary=True):返回list[dict],便于后续 Tool/接口做结构化输出。 -
cnx.close():连接归还给连接池;忘记关闭会导致池耗尽(典型“能跑一会儿就挂”)。 -
对于“动态表名/字段名”的需求:不要让用户直接传表名/字段名;必须做白名单映射(见下节)。
补充:项目一致实现(SQLAlchemy + aiomysql,了解即可)
项目路径:08_greensort_system/backend/app/core/database.py
import os
from dotenv import load_dotenv
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
load_dotenv()
def _get_env_int(key: str, default: int) -> int:
v = os.getenv(key)
if v is None or v == "":
return default
return int(v)
def _build_mysql_url_from_env(*, user_key: str, password_key: str) -> str:
host = os.getenv("MYSQL_HOST", "127.0.0.1")
port = os.getenv("MYSQL_PORT", "3306")
database = os.getenv("MYSQL_DATABASE", "greensort_db")
user = os.getenv(user_key, "greensort")
password = os.getenv(password_key, "greensort123")
return f"mysql+aiomysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4"
DATABASE_URL = os.getenv("DATABASE_URL") or _build_mysql_url_from_env(
user_key="MYSQL_USER",
password_key="MYSQL_PASSWORD",
)
DATABASE_READONLY_URL = os.getenv("DATABASE_READONLY_URL") or _build_mysql_url_from_env(
user_key="MYSQL_READONLY_USER",
password_key="MYSQL_READONLY_PASSWORD",
)
POOL_SIZE = _get_env_int("SQLALCHEMY_POOL_SIZE", 20)
MAX_OVERFLOW = _get_env_int("SQLALCHEMY_MAX_OVERFLOW", 40)
POOL_RECYCLE = _get_env_int("SQLALCHEMY_POOL_RECYCLE", 3600)
POOL_PRE_PING = os.getenv("SQLALCHEMY_POOL_PRE_PING", "true").lower() in {"1", "true", "yes"}
READONLY_POOL_SIZE = _get_env_int("SQLALCHEMY_READONLY_POOL_SIZE", 10)
READONLY_MAX_OVERFLOW = _get_env_int("SQLALCHEMY_READONLY_MAX_OVERFLOW", 20)
engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_size=POOL_SIZE,
max_overflow=MAX_OVERFLOW,
pool_recycle=POOL_RECYCLE,
pool_pre_ping=POOL_PRE_PING,
)
engine_readonly = create_async_engine(
DATABASE_READONLY_URL,
echo=False,
pool_size=READONLY_POOL_SIZE,
max_overflow=READONLY_MAX_OVERFLOW,
pool_recycle=POOL_RECYCLE,
pool_pre_ping=POOL_PRE_PING,
)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
关键点解释(与设计文档字段一一对应):
DATABASE_URL / DATABASE_READONLY_URL:主库与只读库的连接串;只读库适合报表与统计,降低主库压力。pool_size / max_overflow:常驻连接与临时连接上限;值过大可能把数据库连接数打满(结合数据库侧max_connections配置评估)。pool_recycle:连接回收时间,降低“长连接被服务端断开导致偶发报错”的概率。pool_pre_ping:每次取连接前先做可用性检测,减少“拿到坏连接”的情况。
4. 项目版实现:FastAPI + SQLAlchemy(Repository/Service/Router,推荐结构)
项目设计文档采用 FastAPI + Pydantic 的后端服务层。真实项目中,查询通常不直接暴露“任意 SQL”,而是走:路由(API)→ Service(业务)→ Repository(数据访问)→ 数据库(AsyncSession/连接池)。下面给出一套与项目文档一致、且能把“分页/筛选/安全”讲清楚的最小结构。
项目路径:08_greensort_system/backend/app/core/database_deps.py
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import AsyncSessionLocal
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
yield session
关键点解释:
get_db():FastAPI 常用的依赖注入方式,把连接生命周期收敛到一次请求内,避免忘记关闭连接。
项目路径:08_greensort_system/backend/schemas/sorting.py
from datetime import datetime
from typing import Any
from pydantic import BaseModel
class SortingRecordOut(BaseModel):
id: int
batch_id: int
fruit_id: str
line_id: str
diameter_mm: float
grade: str
overall_score: float
created_at: datetime
image_path: str | None = None
class PageResponse(BaseModel):
total: int
page: int
page_size: int
items: list[SortingRecordOut]
trace_id: str
extra: dict[str, Any] | None = None
关键点解释:
SortingRecordOut:与 API 接口文档中的字段保持一致(核心字段 + 可选image_path)。
项目路径:08_greensort_system/backend/repositories/sorting_repo.py
import uuid
from datetime import datetime, timedelta
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
class SortingRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def list_records(
self,
*,
page: int,
page_size: int,
line_id: str | None,
grade: str | None,
batch_id: int | None,
start_date: str | None,
end_date: str | None,
) -> tuple[int, list[dict], str]:
trace_id = uuid.uuid4().hex[:8]
if page < 1:
raise ValueError("page must be >= 1")
if page_size < 1 or page_size > 100:
raise ValueError("page_size must be 1..100")
where_parts: list[str] = []
params: dict[str, object] = {}
if line_id:
where_parts.append("line_id = :line_id")
params["line_id"] = line_id
if grade:
where_parts.append("grade = :grade")
params["grade"] = grade
if batch_id is not None:
where_parts.append("batch_id = :batch_id")
params["batch_id"] = batch_id
if start_date:
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
where_parts.append("created_at >= :start_dt")
params["start_dt"] = start_dt
if end_date:
end_dt_exclusive = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
where_parts.append("created_at < :end_dt_exclusive")
params["end_dt_exclusive"] = end_dt_exclusive
where_sql = (" WHERE " + " AND ".join(where_parts)) if where_parts else ""
total_sql = text(f"SELECT COUNT(*) AS total FROM sorting_record{where_sql}")
total = int((await self._session.execute(total_sql, params)).mappings().one()["total"])
offset = (page - 1) * page_size
data_sql = text(
f"""
SELECT id, batch_id, fruit_id, line_id, diameter_mm, grade, overall_score, created_at, image_path
FROM sorting_record
{where_sql}
ORDER BY created_at DESC
LIMIT :limit OFFSET :offset
"""
)
rows = (
(await self._session.execute(data_sql, {**params, "limit": page_size, "offset": offset}))
.mappings()
.all()
)
return total, [dict(r) for r in rows], trace_id
关键点解释(与安全口径一致):
- 依然是“参数化”查询:这里用 SQLAlchemy
text()+ 命名参数(:line_id),避免字符串拼接注入。 - 日期范围使用
created_at >= ... AND created_at < ...,避免DATE(created_at)导致索引失效。 page_size上限与 API 文档一致(最大 100),避免一次性拉取过多数据。
项目路径:08_greensort_system/backend/services/sorting_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from repositories.sorting_repo import SortingRepository
class SortingService:
def __init__(self, session: AsyncSession) -> None:
self._repo = SortingRepository(session)
async def list_records(self, **kwargs):
return await self._repo.list_records(**kwargs)
关键点解释:
- Service 层负责“业务口径与权限规则”;Repository 层负责“数据库访问与性能口径”。
- 本节课只做只读查询演示;真实项目可在 Service 中加入“权限(按用户角色限定 line_id 可见范围)”的校验。
项目路径:08_greensort_system/backend/app/api/v1/sorting.py
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database_deps import get_db
from schemas.sorting import PageResponse, SortingRecordOut
from services.sorting_service import SortingService
router = APIRouter(prefix="/api/v1/sorting", tags=["sorting"])
@router.get("/records", response_model=PageResponse)
async def list_sorting_records(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
line_id: str | None = None,
grade: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
batch_id: int | None = None,
db: AsyncSession = Depends(get_db),
):
service = SortingService(db)
total, rows, trace_id = await service.list_records(
page=page,
page_size=page_size,
line_id=line_id,
grade=grade,
batch_id=batch_id,
start_date=start_date,
end_date=end_date,
)
return PageResponse(
total=total,
page=page,
page_size=page_size,
items=[SortingRecordOut(**r) for r in rows],
trace_id=trace_id,
)
关键点解释:
- 与 API 接口文档一致:分页参数
page/page_size,筛选参数line_id/grade/start_date/end_date/batch_id。 - 真实项目通常会加认证依赖(Bearer Token)与权限校验(如
Depends(get_current_user)),并在 Service 层用角色权限过滤可见范围。
5. 将接口封装成“分拣场景工具函数”(Tool 调用 FastAPI,推荐)
项目路径:08_greensort_system/backend/tools/query_sorting_records.py
import json
import uuid
import httpx
def query_sorting_records(
page: int = 1,
page_size: int = 20,
line_id: str | None = None,
grade: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
batch_id: int | None = None,
api_base_url: str = "http://127.0.0.1:8000",
token: str | None = None,
) -> str:
trace_id = uuid.uuid4().hex[:8]
if not api_base_url:
return json.dumps({"ok": False, "error": "api_base_url is empty", "trace_id": trace_id}, ensure_ascii=False)
api_base_url = api_base_url.rstrip("/")
url = f"{api_base_url}/api/v1/sorting/records"
headers: dict[str, str] = {}
if token:
headers["Authorization"] = f"Bearer {token}"
params: dict[str, object] = {"page": page, "page_size": page_size}
if line_id is not None:
params["line_id"] = line_id
if grade is not None:
params["grade"] = grade
if batch_id is not None:
params["batch_id"] = batch_id
if start_date is not None:
params["start_date"] = start_date
if end_date is not None:
params["end_date"] = end_date
try:
with httpx.Client(timeout=10.0) as client:
resp = client.get(url, params=params, headers=headers)
return json.dumps(
{
"ok": resp.is_success,
"trace_id": trace_id,
"status_code": resp.status_code,
"data": resp.json() if resp.headers.get("content-type", "").startswith("application/json") else resp.text,
},
ensure_ascii=False,
)
except Exception as e:
return json.dumps({"ok": False, "trace_id": trace_id, "error": str(e)}, ensure_ascii=False)
运行建议(包导入方式):
cd 08_greensort_system/backend
# 先启动服务(另开一个终端窗口运行)
python -m uvicorn app.main:app --host 127.0.0.1 --port 8000
# 打开新的终端
python -c "from tools.query_sorting_records import query_sorting_records; print(query_sorting_records(page=1, page_size=10, line_id='LINE-01', grade='A', batch_id=1, start_date=None, end_date=None))"
关键点解释:
- Tool 不直连数据库:通过 FastAPI 接口拿数据,可以复用项目的认证、权限、审计与限流策略。
api_base_url:接口地址,默认本机http://127.0.0.1:8000;在部署环境替换成服务域名。token:可选 Bearer Token;项目接入认证后由 Tool 统一携带,避免把数据库账号暴露给 Tool。page_size上限:与路由层Query(le=100)保持一致,防止一次性拉取过多数据。
扩展:Tool 直连数据库(MySQLClient 方式,不推荐生产环境)
这个扩展用于理解“当 Tool 必须在受控环境里直连 DB 时,如何把直连查询封装成安全可控的工具函数”。真实项目更推荐上一节的“Tool 调用 FastAPI”方式。
项目路径:08_greensort_system/backend/common/mysql_client.py(复用 MySQLClient)
import json
import uuid
from datetime import date, datetime, timedelta
from decimal import Decimal
from typing import Any
from common.mysql_client import MySQLClient
_ALLOWED_GRADES = {"A", "B", "C", "D", "E"}
def _to_jsonable(value: Any) -> Any:
if value is None:
return None
if isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, (datetime, date)):
return value.isoformat(sep=" ", timespec="seconds") if isinstance(value, datetime) else value.isoformat()
if isinstance(value, Decimal):
return float(value)
if isinstance(value, bytes):
try:
return value.decode("utf-8")
except UnicodeDecodeError:
return value.hex()
if isinstance(value, list):
return [_to_jsonable(v) for v in value]
if isinstance(value, dict):
return {str(k): _to_jsonable(v) for k, v in value.items()}
return str(value)
def _rows_to_jsonable(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [{k: _to_jsonable(v) for k, v in row.items()} for row in rows]
def query_sorting_records_db(
*,
line_id: str | None,
grade: str | None,
batch_id: int | None,
start_date: str | None,
end_date: str | None,
page: int = 1,
page_size: int = 20,
) -> str:
trace_id = uuid.uuid4().hex[:8]
if page < 1:
return json.dumps({"ok": False, "trace_id": trace_id, "error": "page must be >= 1"}, ensure_ascii=False)
if page_size < 1 or page_size > 100:
return json.dumps({"ok": False, "trace_id": trace_id, "error": "page_size must be 1..100"}, ensure_ascii=False)
if line_id is not None:
line_id = line_id.strip()
if not line_id:
return json.dumps({"ok": False, "trace_id": trace_id, "error": "line_id is empty"}, ensure_ascii=False)
if grade is not None:
grade = grade.strip().upper()
if grade not in _ALLOWED_GRADES:
return json.dumps({"ok": False, "trace_id": trace_id, "error": "grade must be A/B/C/D/E"}, ensure_ascii=False)
def _parse_ymd(s: str) -> datetime:
return datetime.strptime(s, "%Y-%m-%d")
where_parts: list[str] = []
params_list: list[Any] = []
if line_id is not None:
where_parts.append("line_id = %s")
params_list.append(line_id)
if grade is not None:
where_parts.append("grade = %s")
params_list.append(grade)
if batch_id is not None:
where_parts.append("batch_id = %s")
params_list.append(batch_id)
if start_date is not None:
try:
start_dt = _parse_ymd(start_date)
except ValueError:
return json.dumps({"ok": False, "trace_id": trace_id, "error": "start_date must be YYYY-MM-DD"}, ensure_ascii=False)
where_parts.append("created_at >= %s")
params_list.append(start_dt)
if end_date is not None:
try:
end_dt_exclusive = _parse_ymd(end_date) + timedelta(days=1)
except ValueError:
return json.dumps({"ok": False, "trace_id": trace_id, "error": "end_date must be YYYY-MM-DD"}, ensure_ascii=False)
where_parts.append("created_at < %s")
params_list.append(end_dt_exclusive)
where_sql = (" WHERE " + " AND ".join(where_parts)) if where_parts else ""
db = MySQLClient()
count_sql = f"SELECT COUNT(*) AS total FROM sorting_record{where_sql}"
count_res = db.select(count_sql, tuple(params_list))
total = int(count_res.rows[0]["total"]) if count_res.rows else 0
offset = (page - 1) * page_size
data_sql = f"""
SELECT id, batch_id, fruit_id, line_id, diameter_mm, grade, overall_score, created_at, image_path
FROM sorting_record
{where_sql}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
data_params = tuple(params_list + [page_size, offset])
data_res = db.select(data_sql, data_params)
return json.dumps(
{
"ok": True,
"trace_id": data_res.trace_id,
"page": page,
"page_size": page_size,
"total": total,
"records": _rows_to_jsonable(data_res.rows),
},
ensure_ascii=False,
)
关键点解释(直连 DB 时必须做到):
- 不暴露“任意 SQL”:只提供业务参数(line_id/grade/日期范围/page/page_size),SQL 模板写死在工具内部。
- 参数化查询:所有外部输入都通过
%s占位符绑定,避免字符串拼接注入。 - 白名单 + 上限:
grade白名单;page_size <= 100,避免拉爆数据库。 - 最小权限:Tool 依然使用
greensort_ro(只读账号),并建议把 DB 网络访问限制在后端或受控环境(堡垒机/内网)。
运行建议(包导入方式):
cd 08_greensort_system/backend
python -c "from tools.query_sorting_records_db import query_sorting_records_db; print(query_sorting_records_db(line_id='LINE-01', grade='A', batch_id=1, start_date=None, end_date=None, page=1, page_size=10))"
1. 注入风险的典型坏例子(不要这么写)
项目路径:08_greensort_system/backend/examples/bad_sql_concat.py(反例)
sql = "SELECT * FROM sorting_record WHERE fruit_id = '" + fruit_id + "'"
为什么危险:
-
如果
fruit_id被注入为APPLE-20260409-000001' OR '1'='1,就可能绕过过滤条件,读取大量不该读的数据。 -
更严重时会拼出多语句(带
;)导致结构破坏(取决于驱动与配置)。 -
参数化:
WHERE fruit_id = %s+params=(fruit_id,) -
白名单:对“等级/设备状态/列名映射”等只允许一组固定值
-
最小权限:工具只用
greensort_ro,且只给 SELECT -
禁止多语句:拒绝
;与注释符号 -
是否使用只读账号
greensort_ro? -
是否所有用户输入都通过参数化绑定(没有字符串拼接)?
-
是否限制了
page_size、时间范围或返回行数上限? -
是否对枚举字段(grade/status/line_id)做了白名单?
-
是否保留了
trace_id(便于追踪与复验)? -
让 AI 先生成一版连接代码,再让学生对照“安全自检清单”逐条审计并修正。
-
让 AI 优化一条聚合查询(例如按设备统计良品率),并要求 AI 给出索引建议与验证方法(
EXPLAIN)。
报错排查演练(学生必做 1 次):
-
制造一个典型错误:把
.env的MYSQL_DATABASE改成不存在的库名,观察错误信息,再按模板 4 让 AI 给出定位→修复→预防。 -
你写的不是“能查就行”,而是“能查、可复验、可审计、默认安全”的工具。
-
常见错误与快速定位:
-
Access denied for user:账号/密码或权限;确认是否用只读账号却执行了写入。 -
Can't connect to MySQL server:地址/端口/防火墙/服务未启动;先用命令行或本机 ping 验证连通性。 -
“跑一会儿就卡住”:连接未关闭导致连接池耗尽;检查
cnx.close()是否在finally。
- 配置 MySQL 数据库连接(含连接池、权限设置):能用只读账号成功连接并跑通自测。
- 分析分拣场景业务数据结构:能说明
sorting_record的关键字段含义(至少 batch_id/fruit_id/line_id/diameter_mm/grade/overall_score/created_at),并能指出quality_standard与equipment_status各自解决什么问题。 - 编写 2–3 个基础 SQL 查询语句:至少 1 条条件查询(对齐 API 的筛选字段:line_id/grade/batch_id/日期范围)+ 1 条聚合查询(品级分布/平均周期等)(可选 1 条联表追溯:记录→批次→标准)。
- 开发数据库查询工具:实现连接封装与查询执行,结果以
list[dict]或 JSON 返回。 - 添加 SQL 注入防护逻辑:参数化 + 白名单 + 行数限制(至少满足两项)。
- 使用 AI 生成连接代码并优化自研查询语句:保留“原版→修正版→理由→自测证据”。
- 记录安全校验要点与问题:至少 3 条(写在提交说明里)。
- 生成 MySQL 数据库连接配置脚本(带安全校验与自测 SQL)。
- 结合分拣场景分拣记录/批次/品质标准数据,生成基础 SQL 查询语句(含优化建议与索引建议)。
- 生成数据库查询工具核心代码(含 SQL 注入防护、只读约束、结构化输出)。
- 针对学生提交的连接报错,分析原因并给出分步解决方案与复验步骤。
- 讲解数据库交互安全校验的核心方法:参数化、白名单、最小权限、审计追踪。
课程思政(数据安全与工程伦理)
- 工业数据涉及质量、产线效率、设备健康与合规审计,一次越权查询或注入漏洞可能造成数据泄露与生产风险。
- “最小权限 + 默认只读 + 可审计 trace_id + 输入白名单”不仅是技术选择,也是工程伦理与安全责任的体现。
- 用 AI 提效要有边界:AI 生成代码必须经过安全审计与自测,不能把“看起来能跑”当作“可上线”。
作业:不布置
1)提交数据库连接配置截图(含连接成功日志)及连接代码(带注释)。
2)提交分拣场景 SQL 查询语句及查询结果截图,附语句优化说明(含 AI 优化建议)。
3)提交数据库查询工具代码(含安全校验逻辑),附开发思路说明。